-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22000][SQL] Address missing Upcast in JavaTypeInference.deserializerFor #23854
Conversation
…of field in Java Bean
ok to test |
.option("header", "true") | ||
.option("mode", "DROPMALFORMED") | ||
.schema("ref int, userId string, x int, y int") | ||
.load("src/test/resources/test-data/spark-22000.csv") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to read test data from a file instead of spark.createDataFrame(...
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess not. Let me try to change not to use file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
Show resolved
Hide resolved
.format("csv") | ||
.option("header", "true") | ||
.option("mode", "DROPMALFORMED") | ||
.schema("ref int, userId string, x int, y int") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add tests for more types other than int
?
Thanks @maropu I addressed your comments. The code got much longer - in Java world I had to deal with Row directly, as well as I need to add equals() / hashCode() (this is actually optional but best practice would implement along with equals()) / toString(). Hope this is OK. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put some minor comments on the test, but, I am not sure this change matches the JIRA? The issue concerned calling .toString
on primitive types, not String
. I suggested String.valueOf
because it accepts primitives.
This change alters behavior a little bit. Now a null is deserialized as "null"
not null
. I'm not sure we want that, nor was that an issue before; can this be called on null anyway?
@@ -115,6 +124,70 @@ public void testBeanWithMapFieldsDeserialization() { | |||
Assert.assertEquals(records, MAP_RECORDS); | |||
} | |||
|
|||
private static final List<Row> ROWS_SPARK_22000 = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the static initialization could just be in a static block rather than split it up, but could this all be local to the new test method if that's the only place it's used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just followed the approach the test class already has, but I agree they can be local to new test method. Will address.
@@ -252,4 +325,116 @@ public String toString() { | |||
return String.format("[%d,%d]", startTime, endTime); | |||
} | |||
} | |||
|
|||
public static class RecordSpark22000 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
just to be tidy?
} | ||
|
||
@Override | ||
public String toString() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need toString? I understand hashCode and equals
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will help to compare expected and actual when test fails. Otherwise they would've seen as Object.toString()
does and it doesn't provide any information why they are not equal.
Test build #102584 has finished for PR 23854 at commit
|
First of all, I'm not a catalyst expert (actually I'm a beginner on this area) so I might be wrong, so please correct me if I'm mistaken.
Why it was happening is IntegerType is matched to Looks like Spark doesn't just restrict matching type should be same. It loosens the restriction and handles some implicit type conversion via leveraging Java API - and primitive types are not compatible with how String deals with (
Unless I misread javadoc or javadoc is wrong, spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala Lines 199 to 220 in 17d0cfc
Btw I guess the case can be added in new UT - we don't need to worry about it after we apply the case in UT. I'll try. |
I agree with your argument about null; it would never have worked here anyway. It feels like there should be an assertion about it to make sure, as if a null reaches here it returns "null" rather than an exception. I also don't know this part well enough, but primitives are matched to string? That sounds weird. It seems like primitives are already handled in
It wasn't clear from the JIRA, initially, how this even happened. Now there's a simple test case attached. Could we paste that in as a test here, and see if it even fails without the change? and if so, how. The test case suggests that a String field is the problem, but it has toString and isn't handled by this part. |
The added UT was slightly modified from attached test case: I just modified it based on review comments. Previously added test failed without the change, and the revised test remains the same.
No I meant null has been handled well and modified code also works well with null. Please find
So it's completely safe to use |
This would match when the type of field in Java class is primitive. |
OK agree about null. I'm still trying to understand why the change fixes it, though I believe it does. Is it accidental or actually solving the problem? Before your change, which field has the wrong generated code -- what type is it? the original test case suggests the String field is the problem, not the primitive problem. But then I don't get where the problem is coming from. Just trying to understand if we're actually understanding why it happens? |
The field/column was To be honest I have no idea this is intended to support (implicit) type conversions when deserializing, but any types to String would sound convenient, and not much weird. Without knowing history/intention I guess I can't explain more. |
@cloud-fan Could you help putting some knowledge around this issue, since you look to be author of Javabean Encoder? Thanks in advance! |
Looks like Encoder is not aware of the type of columns in Dataset: then I understand why it doesn't have strict type check on columns and fields in Java bean. Still not sure what's the intention: expect only StringType for String field so safe to call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I'm getting this now. In the test case attached to the JIRA, the first column 'ref' is (correctly) inferred as IntegerType. The bean class has it as a String. That is arguably an error, but, it should be reasonable to make the assignment. Because it goes to a String, it is most correct to call String.valueOf
, especially as that is consistent with how other types are handled and how the JDK API is set up, with valueOf
methods for these boxed types and String. This should not change results, String.valueOf(Object)
of course returns its .toString
Test build #102598 has finished for PR 23854 at commit
|
btw, why don't we cast internal data into more suitable typed data before deserializing them into Bean instead of this weird type handling like int-> string in deserializer? Probably, it seems we can add cast exprs before collecting data? |
That's probably a better design; this at least fixes the current design and bug. WDYT about proceeding anyway? |
Yea, sure. +1 for the fix in the current design. cc: @cloud-fan |
Sure I think it sounds much better, but as a view of newcomer of Spark SQL it doesn't seem to be trivial.
I guess we also need to handle |
We've already got SQL inferred types for java beans by |
The thing is, inferred types are not propagated into deserializer (which is So if we want to leverage inferred SQL DataType to avoid dealing with ObjectType, it may need to be also propagated in StaticInvoke (and also Invoke maybe) which also looks like non-trivial thing (the code change might be small but just not sure this is what we want). |
Never mind. I realized |
@@ -235,9 +236,6 @@ object JavaTypeInference { | |||
path :: Nil, | |||
returnNullable = false) | |||
|
|||
case c if c == classOf[java.lang.String] => | |||
Invoke(path, "toString", ObjectType(classOf[String])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ScalaReflection
does the same thing, do we have a problem there too?
AFAIK the path
should be a string type column, and it's always safe to call UTF8String.toString
. My gut feeling is, we miss to add Upcast
somewhere in JavaTypeInference
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK the
path
should be a string type column
The sample code in JIRA issue tried to bind IntegerType column to String field in Java bean, which looks to break your expectation. (I guess ScalaReflection would not encounter this case.)
Spark doesn't throw error for this case though - actually Spark would show undefined behaviors, compilation failures on generated code, even might be possible to throw runtime exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sample code tried to bind IntegerType column to String field in Java bean
In scala, we can also do this and Spark will add Upcast
. e.g. spark.range(1).as[String].collect
works fine.
I did a quick search and JavaTypeInference
has no Upcast
. We should fix it and follow ScalaReflection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK. I'll check and address it. Maybe it would be a separate PR if it doesn't fix the new test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah your suggestion seems to work nicely! I left comment to ask which approach to choose: please compare both approach and comment. Thanks!
Test build #102751 has finished for PR 23854 at commit
|
Java style test may be a false alarm: failure is not due to violation but inaccessible of dtd link. |
Retest this please. |
Test build #102762 has finished for PR 23854 at commit
|
Test build #102768 has finished for PR 23854 at commit
|
import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
import org.apache.spark.sql.types._ | ||
|
||
private[spark] object DeserializerBuildHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: everything in catalyst is considered as private, we don't need the private[spark]
|
||
case t if t <:< localTypeOf[java.math.BigDecimal] => | ||
Invoke(path, "toJavaBigDecimal", ObjectType(classOf[java.math.BigDecimal]), | ||
returnNullable = false) | ||
createDeserializerForJavaBigDecimal(path, returnNullable = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mind taking some time to investigate it? Why the java side uses returnNullable = true
but scala side uses returnNullable = false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createDeserializerForJavaBigInteger
, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we mind if we file a new issue which tracks the effort to investigate it? IMHO it looks like beyond this PR. and I'm seeing multiple PRs which keep adding requirements and suddenly reviewers lost interest so fail to reach conclusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea fine to do it in followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
/** Returns the current path with a field at ordinal extracted. */ | ||
def addToPathOrdinal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is only used in ScalaReflection
? If so, how about moving this into there and make it private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be weird if we split out similar code (basically this only adds index handling on addToPath
) to multiple places, but no strong opinion. WDYT? If you still feel better to move this to ScalaReflection, please let me know so that I can move it.
upCastToExpectedType(newPath, dataType, walkedTypePath) | ||
} | ||
|
||
def expressionWithNullSafety( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
case c if c == java.lang.Short.TYPE => "toShortArray" | ||
case c if c == java.lang.Byte.TYPE => "toByteArray" | ||
case c if c == java.lang.Boolean.TYPE => "toBooleanArray" | ||
case other => throw new IllegalStateException("expect primitive array element type " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we don't allow non-nullable & non-primitive case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah my bad. It was for sync between ScalaReflection and JavaTypeInference and I realized it is not exactly same. I'll roll back the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, we don't have any test for this code path? (it seems the latest Jenkins test run passed though...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is very hard to cover all cases, especially in Java test we rely on defining actual bean class to verify. It would be nice to cover more cases if we could define (column type, field type) matrix and generate classes on runtime and leverage these classes into test. Maybe another JIRA issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu
Sorry to make a confusion. I've got confused while dealing with other things altogether. I wasn't wrong so the change was unnecessary, so I'm rolling back the change.
To answer the origin comment, non-nullable & non-primitive case => not possible according to the implementation of inferDataType
. nullable = true
for only primitive case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-nullable & non-primitive case => not possible
I agree with this, and I think the original code style is better, i.e.
val primitiveMethod = ...
val primitiveMethod.map {
...
}.getOrElse {
...
}
We can change the scala side to follow this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Would we keep mapFunction
to deal with upcast? I guess primitiveMethod.map.getOrElse
gets rid of necessary of if (elementNullable)
, but just confirm again whether we would want to go back to origin code which upcast was not there, or just take previous code style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we definitely need to do upcast, just take the code style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarification! Just addressed.
Test build #102782 has finished for PR 23854 at commit
|
retest this please |
Test build #102785 has finished for PR 23854 at commit
|
primitiveMethod.map { method => | ||
Invoke(path, method, ObjectType(c)) | ||
}.getOrElse { | ||
Invoke( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between this branch and if (elementNullable)
? Shall we follow the previous code? e.g.
val primitiveMethod = ...
primitiveMethod.map(...).getOrElse...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... sorry I think I was a bit confused. After looking into it again, the content before this commit seems correct and also in sync with ScalaReflection. I'll revert it back.
Btw, the difference between previous code is that new code handles upcast which ScalaReflection also handles it.
Test build #102796 has finished for PR 23854 at commit
|
thanks, merging to master! |
Thanks all for reviewing, providing nice approaches, and finally merging! |
…ction and JavaTypeInference ## What changes were proposed in this pull request? This patch proposes refactoring `serializerFor` method between `ScalaReflection` and `JavaTypeInference`, being consistent with what we refactored for `deserializerFor` in #23854. This patch also extracts the logic on recording walk type path since the logic is duplicated across `serializerFor` and `deserializerFor` with `ScalaReflection` and `JavaTypeInference`. ## How was this patch tested? Existing tests. Closes #23908 from HeartSaVioR/SPARK-27001. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Spark expects the type of column and the type of matching field is same when deserializing to Object, but Spark hasn't actually restrict it (at least for Java bean encoder) and some users just do it and experience undefined behavior (in SPARK-22000, Spark throws compilation failure on generated code because it calls
.toString()
against primitive type.It doesn't produce error in Scala side because
ScalaReflection.deserializerFor
properly inject Upcast if necessary. This patch proposes applying same thing toJavaTypeInference.deserializerFor
as well.Credit to @srowen, @maropu, and @cloud-fan since they provided various approaches to solve this.
How was this patch tested?
Added UT which query is slightly modified based on sample code in attachment on JIRA issue.